package com.tuandai.log.log4j12.appender;

import com.tuandai.log.kafka.config.Slf4jKafkaConfig;
import com.tuandai.log.kafka.config.Slf4jKafkaConfigSimpleFactory;
import com.tuandai.log.kafka.producer.KafkaProducerConcurrentProcessor;
import com.tuandai.log.kafka.producer.KafkaProducerContext;
import com.tuandai.log.kafka.producer.KafkaProducerProcessor;
import org.apache.log4j.WriterAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/**
 * log4j12 的 kafka提供异步插入支持
 * Created by zhangzhenbin on 17-5-25.
 */
public class KafkaConcurrentAppender extends WriterAppender {
    private Slf4jKafkaConfig slf4jKafkaConfig;

    public KafkaConcurrentAppender() throws Exception {
        slf4jKafkaConfig = Slf4jKafkaConfigSimpleFactory.getInstance().parse();
    }

    protected boolean checkEntryConditions() {
        if (slf4jKafkaConfig == null) {
            LogLog.warn("Kafka Config is Invalid!");
            return false;
        } else {
            KafkaProducerContext.init(slf4jKafkaConfig);
        }
        return true;
    }

    @Override
    protected void subAppend(LoggingEvent event) {
        if (event.getLoggerName().contains("org.apache.kafka")) {
            System.err.println("contain org.apache.kafka,break subAppend");
            return;
        }
        String msg = this.layout.format(event);
        KafkaProducerConcurrentProcessor.send(slf4jKafkaConfig, msg);
    }
}
